自前でデータパイプラインをサクッと構築できる「Airbyte」を試してみた
本記事は、データパイプラインを簡単に構築できるツールAirbyteでロードジョブを試していきます。
本アドベントカレンダーでは、日本でも海外でもマイナー寄りな製品を取り扱ってきてますが、多分一番知名度があるのがこのAirbyteになるかなと思います。
Airbyteについて
Airbyteは2020年にサンフランシスコで創業されたテックカンパニーです。創業からわずか1年でシリーズAに到達し、$26Mの資金調達に成功しているスタートアップで、急激な成長速度で業界から注目を集めています。
Airbyte announces $26M Series A for open-source data connector platform | TechCrunch
その最大の特徴は製品をOSSで公開しているという点でしょう。自身のAWSにデプロイすればAirbyteの機能をそのまま使えてしまう導入障壁が低さから、モダンデータスタック界隈のエンジニアから支持を集めています。最近Hashicorp社が上場して注目を集めたように、OSSを絡めてサービス展開する企業の戦略には目を見張るものがありますね。
対応しているデータソースの数もかなり豊富で、Airbyteを使えば簡単にデータレイクやデータベース間の同期ジョブを実現できます。自社でコンテナの面倒を見たくない方向けにマネージドなCloud版も用意されていますが、2021年12月11日現在ではUSのみ利用可能となっています。
さっそく、Airbyteを試していきましょう!
RDS PostgreSQLからRedshiftへデータをロードする
公式HPからdeploy Airbyte Open-source
をクリックすると、OSS版のAirbyteのドキュメントに飛びます。
Deploy Airbyte - Airbyte Documentationに従い、以下のコマンドを実行してDockerで環境を立ち上げます。
$ git clone https://github.com/airbytehq/airbyte.git $ cd airbyte $ docker-compose up
http://localhost:8000/にアクセスし、メールアドレスをしてContinue
をクリックします。
初期設定の画面にランディングします。Set up your first connection
をクリックし、コネクションのセットアップを進めていきます。
今回は、パブリックに配置したRDS PostgreSQLインスタンスに PostgreSQL サンプルのデータベースを用意する | DevelopersIO で用意したサンプルデータを、同じくパブリックのRedshiftクラスタへロードするジョブを作成していきます。まずはSourceとなるRDS PostgreSQLインスタンスの接続情報を入力します。SSH経由での接続も可能みたいなので、プライベートサブネットに配置されていても対象にできそうですね。
次にDestinationとなる、Redshiftクラスタの接続情報を入力していきます。
最後に、Connectionのセットアップを行います。特にスケジュール実行させる予定はないので、Syncの頻度はmanual
にしています。
SourceとDestination間で対象とするスキーマやテーブルをマッピングしたり、Sync mode
で、追加するか上書きするか調節も可能です。
画面下部のNormalization & Transformation
で、データの変換処理も定義できます。こちらは次節で掘り下げていくので、一旦No custom transformation
のままでいきます。Set up connection
をクリックして確定。
オンボーディングのガイドは閉じてしまってOKです。
左メニューでConnections
をクリックすると、先ほど作成したConnectionが表示されています。Launch
をクリックしてSyncを起動させます。
RDS内のデータがローカル端末を経由してRedshiftにインサートされました。全件データを捌けるリソースを必要とするので、Docker用のメモリはある程度確保しておきましょう。今の設定ではDocker Desktopに8GBのメモリを割り当てています。
Redshift側を確認すると、無事RDS側のデータがロードされていることが確認できました。
Airbyteとdbtとの連携仕様を理解する
続いて、先ほどは飛ばしていたNormalization & Transformation
を試してみます。こちらはTransformations with SQL (Part 1/3) - Airbyte Documentationにチュートリアルが載っているので、こちらを参考に進めていきます。
AirbyteではSQLベースで変換処理の追加が可能ですが、内部的にはdbtを活用してワークフローを制御しているので、まずは仕様を紐解いていくところから始めていきます。Connectionsの画面でCreate Connection
をクリックし、新規作成を進めていきます。
Sourceとして使用するのは、https://storage.googleapis.com/covid19-open-data/v2/latest/epidemiology.csv
に配置されているCovidのサンプルCSVです。
Destinationには、先ほどSourceとして使用していたRDS PostgreSQLを指定します。
Syncの頻度はmanual
に、Sync modeはFull refresh | Overwrite
にしておきます。Set up connection
をクリックして確定させます。
Connections
の一覧画面に戻り、Launch
をクリックして起動させます。
接続に問題がなければ成功するはずです。今回は注目して欲しい箇所は、ログが出力されている/tmp/workspace/2/0/logs.log
というパスです。Dockerコンテナ内の/tmp/workspace/2/0/
というディレクトリにリソースが詰まっているみたいです。
以下のコマンドを実行していき、/tmp/workspace/2/0/
内のリソースをローカルにコピーしていきます。厳密には、/tmp/workspace/2/0/build/run/airbyte_utils/models/generated/
というディレクトリをコピーします。
$ NORMALIZE_WORKSPACE="2/0/" $ docker cp airbyte-server:/tmp/workspace/${NORMALIZE_WORKSPACE}/build/run/airbyte_utils/models/generated/ models/
ディレクトリにはデータロード時に使用するSQLが入っています。ここに転記するには行数が多いので、興味ある方はTransformations with SQL (Part 1/3) - Airbyte Documentationを参照してください。
$ find models models models/airbyte_tables models/airbyte_tables/public models/airbyte_tables/public/covid_epidemiology.sql $ vi models/airbyte_tables/public/covid_epidemiology.sql
次にAirbytesに組み込まれているdbtのプロジェクトを見ていきます。dbt debug
コマンドをAirbyteのCLI経由で実行させると、RDSのコネクションの情報が確認できます。
$ docker run --rm -i -v airbyte_workspace:/data -w /data/$NORMALIZE_WORKSPACE/normalize --network host --entrypoint /usr/local/bin/dbt airbyte/normalization debug --profiles-dir=. --project-dir=. Running with dbt=0.21.1 dbt version: 0.21.1 python version: 3.8.12 python path: /usr/local/bin/python os info: Linux-5.10.76-linuxkit-x86_64-with-glibc2.2.5 Using profiles.yml file at /data/2/0/normalize/profiles.yml Using dbt_project.yml file at /data/2/0/normalize/dbt_project.yml Configuration: profiles.yml file [OK found and valid] dbt_project.yml file [OK found and valid] Required dependencies: - git [OK found] Connection: host: haruta-demo.XXXXXXXXXXXX.ap-northeast-1.rds.amazonaws.com port: 5432 user: postgres database: postgres schema: public search_path: None keepalives_idle: 0 sslmode: None Connection test: [OK connection ok]
接続確認後dbt run
を実行すれば、先ほどAirbyteの画面から実行したロードと同じジョブを実行することができます。
$ docker run --rm -i -v airbyte_workspace:/data -w /data/$NORMALIZE_WORKSPACE/normalize --network host --entrypoint /usr/local/bin/dbt airbyte/normalization run --profiles-dir=. --project-dir=. Running with dbt=0.21.1 Partial parsing enabled: 0 files deleted, 0 files added, 0 files changed. Partial parsing enabled, no changes found, skipping parsing [WARNING]: Configuration paths exist in your dbt_project.yml file which do not apply to any resources. There are 2 unused configuration paths: - models.airbyte_utils.generated.airbyte_incremental - models.airbyte_utils.generated.airbyte_views Found 4 models, 0 tests, 0 snapshots, 0 analyses, 479 macros, 0 operations, 0 seed files, 1 source, 0 exposures 09:21:53 | Concurrency: 32 threads (target='prod') 09:21:53 | 09:21:53 | 1 of 1 START table model public.covid_epidemiology........................................................... [RUN] 09:21:53 | 1 of 1 OK created table model public.covid_epidemiology...................................................... [SELECT 17911 in 0.64s] 09:21:54 | 09:21:54 | Finished running 1 table model in 1.88s. Completed successfully Done. PASS=1 WARN=0 ERROR=0 SKIP=0 TOTAL=1
自前のdbtプロジェクトをAirbyteで実行する
内部構造も理解できたところで、Airbyteの画面からNormalization & Transformation
を設定していきます。ConnectionのSettings
を開き、Custom transformationセクションにあるAdd transformation
をクリックします。
仕様としては、Gitリポジトリに上がっているdbtプロジェクトをインポートし、エントリポイントを指定して実行できるようになっています。チュートリアルに倣って、dbt公式で用意しているサンプルプロジェクトを指定して、dbt seed
、dbt run
、dbt test
の3コマンドを実行する変換処理を追加しました。
3つ設定すると画面上ではこんな見た目になります。さっそく実行していきます。
無事完了しました。もちろん、この変換処理ではSourceで設定しているCSVデータに対しては何も変換処理をしていませんが、内部的に外部のdbtプロジェクトが動いていることが確認できます。
PostgreSQLクライアントからDB内を確認すると、新たにサンプルデータが追加されていることが確認できました。
デモの実施は以上です!
所感
Airbyte、かなりクオリティの高いサービスでした。データソースの種類も豊富なので、さまざまサービスやリソース間のデータを単純にロードするケースでは、個人的にも使っていきたいですね。自分でインスタンス立てて気軽に試せる点も◎。
Airbyteで変換処理を追加するためにはdbtの理解も要求されるので、既存のパイプラインツールをAirbyteにリプレースする場合は、変換処理もdbt流に書き換えておく必要がありそうです。まさにモダンデータスタックに移行するぞ!という気概のある方向けの、文字通りモダンなサービスでした。
本アドベントカレンダーでは、今話題のデータ関連SaaSを取り上げていきますので、引き続き乞うご期待ください!